package com.cyx.netty.c3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

/*
MultiThreadServer优化
直接使用wakeup()解决，不需使用ConcurrentLinkedQueue

单个worker / 多个worker
注：多worker时获取cpu个数
Runtime.getRuntime().availableProcessors()如果工作在docker容器中，因为容器不是物理隔离的，会拿到物理cpu个数，而不是容器申请时的个数
这个问题知道jdk10才修复，使用jvm参数UseContainerSupport配置，默认开启
 */
public class MultiThreadServer1 {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT); 
        ssc.bind(new InetSocketAddress(8888));

        // 单个worker
//        Worker worker = new Worker("worker01");
        // 多个worker 创建固定数量的worker并初始化
        Worker[] workers = new Worker[3];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
        }
        AtomicInteger index = new AtomicInteger();
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    // 单个worker
                    // worker.register(sc);
                    // 多个worker
                    workers[index.getAndIncrement() % workers.length].register(sc);
                }
            }
        }
    }

    static class Worker implements Runnable{
        private String name;
        private Selector selector;
        private volatile boolean start = false;

        public Worker(String name) {
            this.name = name;
        }

        public void register(SocketChannel channel) throws IOException{
            if (!start) {
                selector = Selector.open();
                new Thread(this, name).start();
                start = true;
            }
            channel.register(selector, SelectionKey.OP_READ, null);
            selector.wakeup(); // 使selector.select()阻塞时立即返回
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            sc.read(buffer);
                            buffer.flip();
                            out(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        public void out(ByteBuffer buffer) {
            while (buffer.hasRemaining()) {
                byte b = buffer.get();
                System.out.print(((char) b));
            }
        }
    }
}
